package com.my.service.task;

import com.my.service.task.entity.UserGroupInfo;
import com.my.service.task.kmeans.Cluster;
import com.my.service.task.kmeans.KMeansRunbyusergroup;
import com.my.service.task.kmeans.Point;
import com.my.service.task.map.KMeansFinalUserGroupMap;
import com.my.service.task.map.UserGroupMap;
import com.my.service.task.map.UserGroupSecondMap;
import com.my.service.task.reduce.UserGroupInfoReduce;
import com.my.service.task.reduce.UserGroupbykmeansReduce;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import java.util.*;


public class UserGroupTask {
    public static void main(String[] args) {
        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text = env.readTextFile(params.get("input"));

        // 1. 先将原始数据转为UserGroupInfo类型 groupfield是userid
        DataSet<UserGroupInfo> mapresult = text.map(new UserGroupMap());
        // 2. 根据每个userid对日志进行分组，获取每个userid的相关日志集合并进行reduce聚合
        DataSet<UserGroupInfo> reduceresutl = mapresult.groupBy("groupfield").reduce(new UserGroupInfoReduce());
        // 3. reduce聚合的结果依然是UserGroupInfo类型，然后根据UserGroupInfo的list 获取到该userid相关的信息
        // 同时产生的groupfield是随机划分出来的100组无重复数字
        DataSet<UserGroupInfo> mapbyreduceresult = reduceresutl.map(new UserGroupSecondMap());
        // 4.基于UserGroupbykmeansReduce计算100组中各组的中心点，相当于100 * 6个中心点
        DataSet<ArrayList<Point>> finalresult =  mapbyreduceresult.groupBy("groupfield").reduceGroup(new UserGroupbykmeansReduce());

        try {
            List<ArrayList<Point>> reusltlist = finalresult.collect();
            ArrayList<float[]> dataSet = new ArrayList<>();
            for(ArrayList<Point> array:reusltlist){
                for(Point point:array){
                    dataSet.add(point.getlocalArray());
                }
            }
            // 再对这100 * 6个中心点进行计算，计算出6个中心点
            KMeansRunbyusergroup kMeansRunbyusergroup =new KMeansRunbyusergroup(6, dataSet);
            Set<Cluster> clusterSet = kMeansRunbyusergroup.run();
            List<Point> finalClutercenter = new ArrayList<>();
            int count= 100;
            for(Cluster cluster:clusterSet){
                Point point = cluster.getCenter();
                point.setId(count++);
                finalClutercenter.add(point);
            }
            // 依据中心点对用户进行分群 分群后的结果保存到HBase中
            DataSet<Point> flinalMap = mapbyreduceresult.map(new KMeansFinalUserGroupMap(finalClutercenter));
            env.execute("UserGroupTask analysis");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }


}
